Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ORG] RU-AXAV: KAFKA-13149; Fix NPE when handling malformed record data in produce r… #19

Merged
merged 2 commits into from
Sep 16, 2021

Conversation

sakibguy
Copy link
Owner

…equests (apache#11080)

Raise InvalidRecordException from DefaultRecordBatch.readFrom instead of returning null if there are not enough bytes remaining to read the record. This ensures that the broker can raise a useful exception for malformed record batches.

Reviewers: Ismael Juma [email protected], Jason Gustafson [email protected]

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…equests (#11080)

Raise `InvalidRecordException` from `DefaultRecordBatch.readFrom` instead of returning null if there are not enough bytes remaining to read the record. This ensures that the broker can raise a useful exception for malformed record batches.

Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>
@sakibguy sakibguy added the bug Something isn't working label Sep 15, 2021
@sakibguy sakibguy self-assigned this Sep 15, 2021
This patch fixes several problems with the `ElectLeaders` API in KRaft:

- `KafkaApis` did not properly forward this request type to the controller.
- `ControllerApis` did not handle the request type.
- `ElectLeadersRequest.getErrorResponse` may raise NPE when `TopicPartitions` is null.
- Controller should not do preferred election if `ElectLeaders` specifies `UNCLEAN` election.
- Controller should not do unclean election if `ElectLeaders` specifies `PREFERRED` election.
- Controller should use proper error codes to handle cases when desired leader is unavailable or when no election is needed because a desired leader is already elected.
- When election for all partitions is requested (indicated with null `TopicPartitions` field), the response should not return partitions for which no election was necessary.

In addition to extending the unit test coverage in `ReplicationControlManagerTest`, I have also converted `LeaderElectionCommandTest` to use KRaft.

Reviewers: dengziming <[email protected]>, José Armando García Sancio <[email protected]>, David Arthur <[email protected]>
@sakibguy sakibguy merged commit ec88e8b into sakibguy:trunk Sep 16, 2021
Copy link
Owner Author

@sakibguy sakibguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good work

@@ -84,7 +84,7 @@
EXPIRE_DELEGATION_TOKEN(ApiMessageType.EXPIRE_DELEGATION_TOKEN, false, true),
DESCRIBE_DELEGATION_TOKEN(ApiMessageType.DESCRIBE_DELEGATION_TOKEN),
DELETE_GROUPS(ApiMessageType.DELETE_GROUPS),
ELECT_LEADERS(ApiMessageType.ELECT_LEADERS),
ELECT_LEADERS(ApiMessageType.ELECT_LEADERS, false, true),
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


electionResults.add(electionResult);
electionResults.add(electionResult);
}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

@@ -208,7 +208,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
_state = BrokerState.PENDING_CONTROLLED_SHUTDOWN
// Send the next heartbeat immediately in order to let the controller
// begin processing the controlled shutdown as soon as possible.
scheduleNextCommunication(0)
scheduleNextCommunicationImmediately()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if (isTraceEnabled) {
trace(s"Sending broker registration ${data}")
if (isDebugEnabled) {
debug(s"Sending broker registration ${data}")
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -406,7 +406,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
scheduleNextCommunicationAfterSuccess()
}
} else {
info(s"The controlled has asked us to exit controlled shutdown.")
info(s"The controller has asked us to exit controlled shutdown.")
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get()
}

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very good

))
assertTrue(e.getCause.isInstanceOf[TimeoutException])
}
}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good job!

s"but found ${response.getClass}")
}
}

@AfterEach
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very good work

val requestBuilder = new ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000)
testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder)
}

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

requestBuilder
)
}

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

@sakibguy sakibguy changed the title KAFKA-13149; Fix NPE when handling malformed record data in produce r… RU-AXAV: KAFKA-13149; Fix NPE when handling malformed record data in produce r… Jan 27, 2023
@sakibguy sakibguy changed the title RU-AXAV: KAFKA-13149; Fix NPE when handling malformed record data in produce r… [ORG] RU-AXAV: KAFKA-13149; Fix NPE when handling malformed record data in produce r… Mar 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants